查看原文
其他

Python 操作 Redis 必备神器:redis-py 源码阅读

Python猫 2022-04-12

The following article is from 游戏不存在 Author 肖恩

 △点击上方“Python猫”关注 ,回复“2”加入交流群

作者:肖恩

来源:游戏不存在


  • redis协议规范

  • redis-py概述

  • redis-py基础使用

    • RedisCommand

    • Redis连接

    • 连接池

  • pipeline

  • LuaScript

  • lock

redis协议规范

RESP(Redis Serialization Protocol)是Redis客户端和服务端的通讯协议。数据示例如下:

  1. +OK\r\n

  2. -Error message\r\n

  3. :1000\r\n

  4. $6\r\nfoobar\r\n

  5. *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n

  6. *3\r\n:1\r\n:2\r\n:3\r\n

协议定义了5种类型:

  1. +前缀表示字符串,后接字符串文本,以 \r\n结尾,通常用于命令结果

  2. -前缀表示异常信息,后接以空格连接的两个字符串,以 \r\n结尾

  3. :前缀表示整数,后接整数,以 \r\n结尾

  4. $前缀表示定长的字符串,后接字符串长度, \r\n和字符串文本,以 \r\n结尾

  5. *前缀表示数组,后接数组的长度和 \r\n,数组的每个元素可以由上面4种类型构成

协议还约定了Null等的实现,详情请看参考链接部分。下面示例了 LLEN mylist 的请求和响应

  1. C: *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n


  2. S: :48293\r\n

  • 客户端发送了 LLEN mylist指令,指令序列化成RESP长度为2的数组,2个定长字符串分别是llen和mylist。

  • 服务端响应整数48293,即mylist数据的长度。

Request-Response model是redis服务的请求响应模型,可以对比http协议的模式。redis服务端响应客户端的指令,处理后响应回复客户端,可以简单理解为一问一答。当然pipeline,pub/sub和monitor除外。

Redis-py 源码概述

本文使用的redis-py版本是 3.5.3, 文件及包信息是:

名称描述
clientredis的api
connection连接,连接池等
exceptions异常和错误
lock锁的实现
sentinel扩展的哨兵连接
utils工具
_compat都版本适配包

redis-py未依赖其它的包,代码量虽然不多,6000行左右,但是100%理解还是需要一定的时间和基础。本文从redis-py日常使用出发,也是redis-py的README中内容,介绍这些基础功能在源码中的实现。

redis-py基础使用

RedisCommand

redis-py的简单使用:

  1. >>> import redis

  2. >>> r = redis.Redis(host='localhost', port=6379, db=0)

  3. >>> r.set('foo', 'bar')

  4. True

  5. >>> r.get('foo')

  6. b'bar'

追踪redis-py的实现:

  1. # client.py


  2. class Redis(object)


  3. def __init__(self, host='localhost', port=6379,

  4. db=0, ..):

  5. ...

  6. connection_pool = ConnectionPool(**kwargs)

  7. self.connection_pool = connection_pool

  8. ...


  9. def set(self, name, value, ex=None, px=None, nx=False, xx=False, keepttl=False)

  10. ...

  11. return self.execute_command('SET', *pieces)


  12. def get(self, name):

  13. return self.execute_command('GET', name)


  14. # COMMAND EXECUTION AND PROTOCOL PARSING

  15. def execute_command(self, *args, **options):

  16. "Execute a command and return a parsed response"

  17. conn = self.connection or pool.get_connection(command_name, **options)

  18. conn.send_command(*args)

  19. return self.parse_response(conn, command_name, **options)

注意:为了便于理解,示例代码和实际的代码有出入,省去了复杂的逻辑和异常等

  • redis首先创造了一个到redis服务的连接,

  • redis包装了redis的所有指令,使用命令模式执行指令。

  • 执行命令就是使用创建的连接发送指令,然后解析和获取响应。这和redis协议上的Request-Response model行为一致。

Redis连接

继续查看连接的创建和执行:

  1. # connection.py

  2. class Connection(object)


  3. def __init__(...):

  4. self.host = host

  5. self.port = int(port)

  6. self._sock = connect()


  7. def connect():

  8. for res in socket.getaddrinfo(self.host, self.port, self.socket_type,

  9. socket.SOCK_STREAM):

  10. family, socktype, proto, canonname, socket_address = res

  11. sock = socket.socket(family, socktype, proto)

  12. ...

  13. # TCP_NODELAY

  14. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

  15. # connect

  16. sock.connect(socket_address)

  17. ...

  18. return sock


  19. def pack_command(self, *args):

  20. command = []

  21. args = tuple(args[0].encode().split()) + args[1:]

  22. ...

  23. buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))

  24. for arg in imap(self.encoder.encode, args):

  25. buff = SYM_EMPTY.join(

  26. (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF))

  27. output.append(buff)

  28. output.append(arg)

  29. ...

  30. return command


  31. def send_command(self, *args, **kwargs):


  32. command = self.pack_command(args)


  33. if isinstance(command, str):

  34. command = [command]

  35. for item in command:

  36. self._sock.sendall(*args, **kwargs)

  • connection维持了一个socket连接

  • 收到redis的命令使用pack_command进行RESP的序列化打包

  • 数据包使用socket发送

  1. # connection.py


  2. class Connection(object)


  3. def __init__(...,parser_class=PythonParser,...);

  4. self._parser = parser_class(socket_read_size=socket_read_size)


  5. def read_response(self):

  6. response = self._parser.read_response()

  7. return response


  8. class PythonParser(BaseParser):

  9. "Plain Python parsing class"

  10. def __init__(self, socket_read_size):

  11. self.socket_read_size = socket_read_size

  12. ...

  13. self._sock = connection._sock

  14. self._buffer = SocketBuffer(self._sock,

  15. self.socket_read_size,

  16. connection.socket_timeout)

  17. self.encoder = connection.encoder


  18. def read_response(self):

  19. raw = self._buffer.readline()


  20. byte, response = raw[:1], raw[1:]


  21. # server returned an error

  22. if byte == b'-':

  23. response = nativestr(response)

  24. ...

  25. # single value

  26. elif byte == b'+':

  27. pass

  28. # int value

  29. elif byte == b':':

  30. response = long(response)

  31. # bulk response

  32. elif byte == b'$':

  33. length = int(response)

  34. response = self._buffer.read(length)

  35. # multi-bulk response

  36. elif byte == b'*':

  37. length = int(response)

  38. response = [self.read_response() for i in xrange(length)]

  39. if isinstance(response, bytes):

  40. response = self.encoder.decode(response)

  41. return response

  • connection创建了一个parser用于读取和解析服务响应

  • 默认的PythonParser使用SocketBuffer读取socket数据

  • read_response实现了RESP协议的解析过程。对于每行数据 \r\n,第一个字符是响应类型,剩下的数据内容,如果是multi-bulk还需要循环读取多行。建议对比协议和发送请求进行详细阅读理解。

PythonParser是pure-python的实现,如果希望更高效,可以额外安装hiredis,会提供一个基于c的解析器 HiredisParser

连接池

redis-py使用连接池来提高执行效率,主要的使用方法3个步骤,创建连接池,从连接池中获取有效连接执行命令,完成后释放连接,语句如下:

  1. # redis.py

  2. connection_pool = ConnectionPool(**kwargs)

  3. pool.get_connection(command_name, **options)


  4. try:

  5. conn.send_command(*args)

  6. ...

  7. finally:

  8. ...

  9. pool.release(conn)

连接池一定要注意释放,可以用try/finally,也可以使用上下文装饰器,这里使用了前者

连接池的具体实现:

  1. # connection.py

  2. class ConnectionPool(object):


  3. def __init__(...):

  4. self._available_connections = []

  5. self._in_use_connections = set()


  6. def make_connection(self):

  7. "Create a new connection"

  8. return self.connection_class(**self.connection_kwargs)


  9. def get_connection(self, command_name, *keys, **options)

  10. try:

  11. connection = self._available_connections.pop()

  12. except IndexError:

  13. connection = self.make_connection()

  14. self._in_use_connections.add(connection)

  15. ...

  16. connection.connect()

  17. return connection


  18. def release(self, connection):

  19. "Releases the connection back to the pool"


  20. try:

  21. self._in_use_connections.remove(connection)

  22. except KeyError:

  23. # Gracefully fail when a connection is returned to this pool

  24. # that the pool doesn't actually own

  25. pass


  26. self._available_connections.append(connection)

  • 连接池内部使用可用连接数组和正在使用连接集合管理所有连接

  • 获取连接时候,优先从可用连接数组获取;没有可用连接会创建新的连接

  • 所有获取到的连接会加入正在使用连接, 如果当前连接未连接会先建立连接

  • 连接释放时会从正在使用连接集合中移除,然后加入可用连接数组数组,等待复用

到这里,我们基本理顺了一个redis指令执行的流程:

  1. r = redis.Redis(host='localhost', port=6379, db=0)

  2. r.set('foo', 'bar')

pipeline

redis还支持pipeline管线模式,可以批量发送一些命令,然后获取所有的结果:

  1. >>> r = redis.Redis(...)

  2. >>> pipe = r.pipeline()

  3. >>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()

  4. [True, True, 6]

pipeline的继承自redis,做了一些扩展

  1. class Pipeline(Redis)

  2. def __init__(...):

  3. self.command_stack = []


  4. def execute_command(self, *args, **kwargs):

  5. self.command_stack.append((args, options))

  6. return self


  7. def execute(self, raise_on_error=True):

  8. "Execute all the commands in the current pipeline"

  9. stack = self.command_stack


  10. execute = self._execute_pipeline


  11. execute(conn, stack, raise_on_error)


  12. def _execute_pipeline(self, connection, commands, raise_on_error):

  13. # build up all commands into a single request to increase network perf

  14. all_cmds = connection.pack_commands([args for args, _ in commands])

  15. connection.send_packed_command(all_cmds)


  16. response = []

  17. for args, options in commands:

  18. response.append(

  19. self.parse_response(connection, args[0], **options))


  20. return response

  • pipeline使用一个stack来临时存储批量发送的命令,同时返回自身,这样可以支持链式语法

  • execute时候才正式发送指令

  • 发送指令后再依次获取服务响应,打包称一个数组统一返回

LuaScript

redis使用lua脚本来处理事务,使用方法如下:

  1. >>> r = redis.Redis()

  2. >>> lua = """

  3. ... local value = redis.call('GET', KEYS[1])

  4. ... value = tonumber(value)

  5. ... return value * ARGV[1]"""

  6. >>> multiply = r.register_script(lua)

  7. >>> r.set('foo', 2)

  8. >>> multiply(keys=['foo'], args=[5])

  9. 10

  • lua脚本中定义了KEYS和ARGV两个数组用于接受参数,KEY的第一个值(lua数组从1开始)是key的名称,ARGV的第一个值是倍数

  • 脚本需要进行注册

  • redis-py中把参数传递给脚本并执行得到结果

脚本的实现原理:

  1. # client.py

  2. class Redis(object):


  3. def register_script(self, script):

  4. return Script(self, script


  5. def script_load(self, script):

  6. "Load a Lua ``script`` into the script cache. Returns the SHA."

  7. return self.execute_command('SCRIPT LOAD', script)


  8. def evalsha(self, sha, numkeys, *keys_and_args):

  9. return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)


  10. class Script(object):

  11. "An executable Lua script object returned by ``register_script``"


  12. def __init__(self, registered_client, script):

  13. self.registered_client = registered_client

  14. self.script = script

  15. # Precalculate and store the SHA1 hex digest of the script.

  16. ...

  17. self.sha = hashlib.sha1(script).hexdigest()


  18. def __call__(self, keys=[], args=[], client=None):

  19. "Execute the script, passing any required ``args``"

  20. args = tuple(keys) + tuple(args)

  21. # make sure the Redis server knows about the script

  22. ...

  23. try:

  24. return client.evalsha(self.sha, len(keys), *args)

  25. except NoScriptError:

  26. # Maybe the client is pointed to a differnet server than the client

  27. # that created this instance?

  28. # Overwrite the sha just in case there was a discrepancy.

  29. self.sha = client.script_load(self.script)

  30. return client.evalsha(self.sha, len(keys), *args

  • lua脚本通过 script load 加载到redis服务,并获得一个sha值,sha值可以重用,避免多次加载同一脚本

  • 通过 evalsha 执行脚本

lock

redis-py还提供了一个全局锁的实现, 可以跨进程同步:

  1. try:

  2. with r.lock('my-lock-key', blocking_timeout=5) as lock:

  3. # code you want executed only after the lock has been acquired

  4. except LockError:

  5. # the lock wasn't acquired

下面是其实实现:

  1. # lock.py

  2. class Lock(object):


  3. LUA_RELEASE_SCRIPT = """

  4. local token = redis.call('get', KEYS[1])

  5. if not token or token ~= ARGV[1] then

  6. return 0

  7. end

  8. redis.call('del', KEYS[1])

  9. return 1

  10. ""


  11. def __init__(...):

  12. ...

  13. self.redis = redis

  14. self.name = name

  15. self.local = threading.local() if self.thread_local else dummy()

  16. self.local.token = None

  17. cls = self.__class__

  18. cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)


  19. def __enter__(self):

  20. # force blocking, as otherwise the user would have to check whether

  21. # the lock was actually acquired or not.

  22. if self.acquire(blocking=True):

  23. return self

  24. raise LockError("Unable to acquire lock within the time specified")


  25. def __exit__(self, exc_type, exc_value, traceback):

  26. self.release()


  27. def acquire(self, blocking=None, blocking_timeout=None, token=None):

  28. ...

  29. token = uuid.uuid1().hex.encode()

  30. self.redis.set(self.name, token, nx=True, px=timeout)

  31. ...

  32. self.local.token = token

  33. ...


  34. def release(self):

  35. expected_token = self.local.token

  36. self.local.token = None

  37. self.lua_release(keys=[self.name],

  38. args=[expected_token],

  39. client=self.redis)

  • LUA_RELEASE_SCRIPT使用lua脚本来处理删除token的事务

  • lock使用线程变量来存储token值,保证多线程并发可以正常

  • _enter和_exit是装饰器语法,保证可以合法的获取和释放

  • 申请锁的时候获取一个临时的token,然后设置到redis服务中,这个token是有生命周期的,可以超时自动释放。

  • 释放的时候清理线程本地变量和redis服务中的变量

TODO

源码中的 publish/subscibeMonitor , Sentinel 和事务等内容,个人认为并不在主线任务上,留待后续再行介绍。

参考链接

  • https://redis.io/topics/protocol

  • https://github.com/andymccurdy/redis-py

  • https://pypi.org/project/hiredis/#description

近两年里,我原创和翻译了130+技术文章,主要关注Python进阶、小技巧、编程设计、PEP翻译、Python哲学等话题。现已集结出了一本电子书《优雅的Python》,请回复数字『1』,获取下载地址。

近期热门文章推荐:

Python优化机制:常量折叠
如何用 Python 制作地球仪?
Python 源码混淆与加密
为什么 Python 多线程无法利用多核?

分享在看是对我最大的支持!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存